/*
 * librdkafka - The Apache Kafka C/C++ library
 *
 * Copyright (c) 2021-2022, Magnus Edenhill
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright notice,
 *    this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright notice,
 *    this list of conditions and the following disclaimer in the documentation
 *    and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */


/**
 * @name HTTP client
 *
 */

#include "rdkafka_int.h"
#include "rdunittest.h"

#include <stdarg.h>

#include <curl/curl.h>
#include "rdhttp.h"

#if WITH_SSL
#include "rdkafka_ssl.h"
#endif

/** Maximum response size, increase as necessary. */
#define RD_HTTP_RESPONSE_SIZE_MAX 1024 * 1024 * 500 /* 500kb */


void rd_http_error_destroy(rd_http_error_t *herr) {
        rd_free(herr);
}

static rd_http_error_t *rd_http_error_new(int code, const char *fmt, ...)
    RD_FORMAT(printf, 2, 3);
static rd_http_error_t *rd_http_error_new(int code, const char *fmt, ...) {
        size_t len = 0;
        rd_http_error_t *herr;
        va_list ap;

        va_start(ap, fmt);

        if (fmt && *fmt) {
                va_list ap2;
                va_copy(ap2, ap);
                len = rd_vsnprintf(NULL, 0, fmt, ap2);
                va_end(ap2);
        }

        /* Use single allocation for both herr and the error string */
        herr         = rd_malloc(sizeof(*herr) + len + 1);
        herr->code   = code;
        herr->errstr = herr->data;

        if (len > 0)
                rd_vsnprintf(herr->errstr, len + 1, fmt, ap);
        else
                herr->errstr[0] = '\0';

        va_end(ap);

        return herr;
}

/**
 * @brief Same as rd_http_error_new() but reads the error string from the
 *        provided buffer.
 */
static rd_http_error_t *rd_http_error_new_from_buf(int code,
                                                   const rd_buf_t *rbuf) {
        rd_http_error_t *herr;
        rd_slice_t slice;
        size_t len = rd_buf_len(rbuf);

        if (len == 0)
                return rd_http_error_new(
                    code, "Server did not provide an error string");


        /* Use single allocation for both herr and the error string */
        herr         = rd_malloc(sizeof(*herr) + len + 1);
        herr->code   = code;
        herr->errstr = herr->data;
        rd_slice_init_full(&slice, rbuf);
        rd_slice_read(&slice, herr->errstr, len);
        herr->errstr[len] = '\0';

        return herr;
}

void rd_http_req_destroy(rd_http_req_t *hreq) {
        RD_IF_FREE(hreq->hreq_curl, curl_easy_cleanup);
        RD_IF_FREE(hreq->hreq_buf, rd_buf_destroy_free);
}


/**
 * @brief Curl writefunction. Writes the bytes passed from curl
 *        to the hreq's buffer.
 */
static size_t
rd_http_req_write_cb(char *ptr, size_t size, size_t nmemb, void *userdata) {
        rd_http_req_t *hreq = (rd_http_req_t *)userdata;

        if (unlikely(rd_buf_len(hreq->hreq_buf) + nmemb >
                     RD_HTTP_RESPONSE_SIZE_MAX))
                return 0; /* FIXME: Set some overflow flag or rely on curl? */

        rd_buf_write(hreq->hreq_buf, ptr, nmemb);

        return nmemb;
}

#if WITH_SSL
/**
 * @brief Callback function for setting up the SSL_CTX for HTTPS requests.
 *
 * This function sets the default CA paths for the SSL_CTX, and if that fails,
 * it attempts to probe and set a default CA location. If `probe` is forced
 * it skips the default CA paths and directly probes for CA certificates.
 *
 * On Windows, it attempts to load CA root certificates from the
 * configured Windows certificate stores before falling back to the default.
 *
 * @return `CURLE_OK` on success, or `CURLE_SSL_CACERT_BADFILE` on failure.
 */
static CURLcode
rd_http_ssl_ctx_function(CURL *curl, void *sslctx, void *userptr) {
        SSL_CTX *ctx   = (SSL_CTX *)sslctx;
        rd_kafka_t *rk = (rd_kafka_t *)userptr;
        int r          = -1;
        rd_bool_t force_probe =
            !rd_strcmp(rk->rk_conf.https.ca_location, "probe");
        rd_bool_t use_probe = force_probe;

#if WITH_STATIC_LIB_libcrypto
        /* We fallback to `probe` when statically linked. */
        use_probe = rd_true;
#endif

#ifdef _WIN32
        /* Attempt to load CA root certificates from the
         * configured Windows certificate stores. */
        r = rd_kafka_ssl_win_load_cert_stores(rk, "https", ctx,
                                              rk->rk_conf.ssl.ca_cert_stores);
        if (r == 0) {
                rd_kafka_log(rk, LOG_NOTICE, "CERTSTORE",
                             "No CA certificates loaded for `https` from "
                             "Windows certificate stores: "
                             "falling back to default OpenSSL CA paths");
                r = -1;
        } else if (r == -1)
                rd_kafka_log(rk, LOG_NOTICE, "CERTSTORE",
                             "Failed to load CA certificates for `https` from "
                             "Windows certificate stores: "
                             "falling back to default OpenSSL CA paths");

        if (r != -1) {
                rd_kafka_dbg(rk, SECURITY, "SSL",
                             "Successfully loaded CA certificates for `https` "
                             "from Windows certificate stores");
                return CURLE_OK; /* Success, CA certs loaded on Windows */
        }
#endif

        if (!force_probe) {
                /* Previous default behavior: use predefined paths set when
                 * building OpenSSL. */
                char errstr[512];
                r = SSL_CTX_set_default_verify_paths(ctx);
                if (r == 1) {
                        rd_kafka_dbg(rk, SECURITY, "SSL",
                                     "SSL_CTX_set_default_verify_paths() "
                                     "for `https` "
                                     "succeeded");
                        return CURLE_OK; /* Success */
                }

                /* Read error and clear the error stack. */
                rd_kafka_ssl_error0(rk, NULL, "https", errstr, sizeof(errstr));
                rd_kafka_dbg(rk, SECURITY, "SSL",
                             "SSL_CTX_set_default_verify_paths() "
                             "for `https` "
                             "failed: %s",
                             errstr);
        }

        if (use_probe) {
                /* We asked for probing or we're using
                 * a statically linked version of OpenSSL. */

                r = rd_kafka_ssl_probe_and_set_default_ca_location(rk, "https",
                                                                   ctx);
                if (r == 0)
                        return CURLE_OK;
        }

        return CURLE_SSL_CACERT_BADFILE;
}

static void rd_http_ssl_configure(rd_kafka_t *rk, CURL *hreq_curl) {
        rd_bool_t force_probe =
            !rd_strcmp(rk->rk_conf.https.ca_location, "probe");

        if (!force_probe && rk->rk_conf.https.ca_location) {
                rd_bool_t is_dir;
                rd_kafka_dbg(rk, SECURITY, "SSL",
                             "Setting `https` CA certs from "
                             "configured location: %s",
                             rk->rk_conf.https.ca_location);
                if (rd_file_stat(rk->rk_conf.https.ca_location, &is_dir)) {
                        if (is_dir) {
                                curl_easy_setopt(hreq_curl, CURLOPT_CAPATH,
                                                 rk->rk_conf.https.ca_location);
                                curl_easy_setopt(hreq_curl, CURLOPT_CAINFO,
                                                 NULL);
                        } else {
                                curl_easy_setopt(hreq_curl, CURLOPT_CAPATH,
                                                 NULL);
                                curl_easy_setopt(hreq_curl, CURLOPT_CAINFO,
                                                 rk->rk_conf.https.ca_location);
                        }
                } else {
                        /* Path doesn't exist, don't set any trusted
                         * certificate. */
                        curl_easy_setopt(hreq_curl, CURLOPT_CAINFO, NULL);
                        curl_easy_setopt(hreq_curl, CURLOPT_CAPATH, NULL);
                }
        } else if (!force_probe && rk->rk_conf.https.ca_pem) {
#if CURL_AT_LEAST_VERSION(7, 77, 0)
                struct curl_blob ca_blob = {
                    .data  = rk->rk_conf.https.ca_pem,
                    .len   = strlen(rk->rk_conf.https.ca_pem),
                    .flags = CURL_BLOB_COPY};
                rd_kafka_dbg(rk, SECURITY, "SSL",
                             "Setting `https` CA certs from "
                             "configured PEM string");
                curl_easy_setopt(hreq_curl, CURLOPT_CAINFO_BLOB, &ca_blob);
#endif
                /* Only the blob should be set, no default paths. */
                curl_easy_setopt(hreq_curl, CURLOPT_CAINFO, NULL);
                curl_easy_setopt(hreq_curl, CURLOPT_CAPATH, NULL);
        } else {
                curl_easy_setopt(hreq_curl, CURLOPT_SSL_CTX_FUNCTION,
                                 rd_http_ssl_ctx_function);
                curl_easy_setopt(hreq_curl, CURLOPT_SSL_CTX_DATA, rk);
        }
}
#endif

rd_http_error_t *
rd_http_req_init(rd_kafka_t *rk, rd_http_req_t *hreq, const char *url) {
        memset(hreq, 0, sizeof(*hreq));

        hreq->hreq_curl = curl_easy_init();
        if (!hreq->hreq_curl)
                return rd_http_error_new(-1, "Failed to create curl handle");

        hreq->hreq_buf = rd_buf_new(1, 1024);

        curl_easy_setopt(hreq->hreq_curl, CURLOPT_URL, url);
#if CURL_AT_LEAST_VERSION(7, 85, 0)
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_PROTOCOLS_STR, "http,https");
#else
        /* As of 06/10/2025 Debian 10 and CentOS Stream 9 ship with
         * older CURL versions, remove this condition once they're not supported
         * anymore. */
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_PROTOCOLS,
                         CURLPROTO_HTTP | CURLPROTO_HTTPS);
#endif
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_MAXREDIRS, 16);
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_TIMEOUT, 30);
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_ERRORBUFFER,
                         hreq->hreq_curl_errstr);
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_NOSIGNAL, 1);
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_WRITEFUNCTION,
                         rd_http_req_write_cb);
        curl_easy_setopt(hreq->hreq_curl, CURLOPT_WRITEDATA, (void *)hreq);

#if WITH_SSL
        rd_http_ssl_configure(rk, hreq->hreq_curl);
#endif

        return NULL;
}

/**
 * @brief Synchronously (blockingly) perform the HTTP operation.
 */
rd_http_error_t *rd_http_req_perform_sync(rd_http_req_t *hreq) {
        CURLcode res;
        long code = 0;

        res = curl_easy_perform(hreq->hreq_curl);
        if (unlikely(res != CURLE_OK))
                return rd_http_error_new(-1, "%s", hreq->hreq_curl_errstr);

        curl_easy_getinfo(hreq->hreq_curl, CURLINFO_RESPONSE_CODE, &code);
        hreq->hreq_code = (int)code;
        if (hreq->hreq_code >= 400)
                return rd_http_error_new_from_buf(hreq->hreq_code,
                                                  hreq->hreq_buf);

        return NULL;
}


int rd_http_req_get_code(const rd_http_req_t *hreq) {
        return hreq->hreq_code;
}

const char *rd_http_req_get_content_type(rd_http_req_t *hreq) {
        const char *content_type = NULL;

        if (curl_easy_getinfo(hreq->hreq_curl, CURLINFO_CONTENT_TYPE,
                              &content_type))
                return NULL;

        return content_type;
}


/**
 * @brief Perform a blocking HTTP(S) request to \p url.
 *
 * Returns the response (even if there's a HTTP error code returned)
 * in \p *rbufp.
 *
 * Returns NULL on success (HTTP response code < 400), or an error
 * object on transport or HTTP error - this error object must be destroyed
 * by calling rd_http_error_destroy(). In case of HTTP error the \p *rbufp
 * may be filled with the error response.
 */
rd_http_error_t *
rd_http_get(rd_kafka_t *rk, const char *url, rd_buf_t **rbufp) {
        rd_http_req_t hreq;
        rd_http_error_t *herr;

        *rbufp = NULL;

        herr = rd_http_req_init(rk, &hreq, url);
        if (unlikely(herr != NULL))
                return herr;

        herr = rd_http_req_perform_sync(&hreq);
        if (herr) {
                rd_http_req_destroy(&hreq);
                return herr;
        }

        *rbufp        = hreq.hreq_buf;
        hreq.hreq_buf = NULL;

        return NULL;
}


/**
 * @brief Extract the JSON object from \p hreq and return it in \p *jsonp.
 *
 * @returns Returns NULL on success, or an JSON parsing error - this
 *          error object must be destroyed by calling rd_http_error_destroy().
 */
rd_http_error_t *rd_http_parse_json(rd_http_req_t *hreq, cJSON **jsonp) {
        size_t len;
        char *raw_json;
        const char *end = NULL;
        rd_slice_t slice;
        rd_http_error_t *herr = NULL;

        /* cJSON requires the entire input to parse in contiguous memory. */
        rd_slice_init_full(&slice, hreq->hreq_buf);
        len = rd_buf_len(hreq->hreq_buf);

        raw_json = rd_malloc(len + 1);
        rd_slice_read(&slice, raw_json, len);
        raw_json[len] = '\0';

        /* Parse JSON */
        *jsonp = cJSON_ParseWithOpts(raw_json, &end, 0);

        if (!*jsonp)
                herr = rd_http_error_new(hreq->hreq_code,
                                         "Failed to parse JSON response "
                                         "at %" PRIusz "/%" PRIusz,
                                         (size_t)(end - raw_json), len);
        rd_free(raw_json);
        return herr;
}


/**
 * @brief Check if the error returned from HTTP(S) is temporary or not.
 *
 * @returns If the \p error_code is temporary, return rd_true,
 *          otherwise return rd_false.
 *
 * @locality Any thread.
 */
static rd_bool_t rd_http_is_failure_temporary(int error_code) {
        switch (error_code) {
        case 408: /**< Request timeout */
        case 425: /**< Too early */
        case 429: /**< Too many requests */
        case 500: /**< Internal server error */
        case 502: /**< Bad gateway */
        case 503: /**< Service unavailable */
        case 504: /**< Gateway timeout */
                return rd_true;

        default:
                return rd_false;
        }
}


/**
 * @brief Perform a blocking HTTP(S) request to \p url with
 *        HTTP(S) headers and data with \p timeout_s.
 *        If the HTTP(S) request fails, will retry another \p retries times
 *        with multiplying backoff \p retry_ms.
 *
 * @returns The result will be returned in \p *jsonp.
 *          Returns NULL on success (HTTP response code < 400), or an error
 *          object on transport, HTTP error or a JSON parsing error - this
 *          error object must be destroyed by calling rd_http_error_destroy().
 *
 * @locality Any thread.
 */
rd_http_error_t *rd_http_post_expect_json(rd_kafka_t *rk,
                                          const char *url,
                                          const struct curl_slist *headers,
                                          const char *post_fields,
                                          size_t post_fields_size,
                                          int timeout_s,
                                          int retries,
                                          int retry_ms,
                                          cJSON **jsonp) {
        rd_http_error_t *herr;
        rd_http_req_t hreq;
        int i;
        size_t len;
        const char *content_type;

        herr = rd_http_req_init(rk, &hreq, url);
        if (unlikely(herr != NULL))
                return herr;

        curl_easy_setopt(hreq.hreq_curl, CURLOPT_HTTPHEADER, headers);
        curl_easy_setopt(hreq.hreq_curl, CURLOPT_TIMEOUT, timeout_s);

        curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDSIZE,
                         post_fields_size);
        curl_easy_setopt(hreq.hreq_curl, CURLOPT_POSTFIELDS, post_fields);

        for (i = 0; i <= retries; i++) {
                if (rd_kafka_terminating(rk)) {
                        rd_http_req_destroy(&hreq);
                        return rd_http_error_new(-1, "Terminating");
                }

                herr = rd_http_req_perform_sync(&hreq);
                len  = rd_buf_len(hreq.hreq_buf);

                if (!herr) {
                        if (len > 0)
                                break; /* Success */
                        /* Empty response */
                        rd_http_req_destroy(&hreq);
                        return NULL;
                }
                /* Retry if HTTP(S) request returns temporary error and there
                 * are remaining retries, else fail. */
                if (i == retries || !rd_http_is_failure_temporary(herr->code)) {
                        rd_http_req_destroy(&hreq);
                        return herr;
                }

                /* Retry */
                rd_http_error_destroy(herr);
                rd_usleep(retry_ms * 1000 * (i + 1), &rk->rk_terminate);
        }

        content_type = rd_http_req_get_content_type(&hreq);

        if (!content_type || rd_strncasecmp(content_type, "application/json",
                                            strlen("application/json"))) {
                if (!herr)
                        herr = rd_http_error_new(
                            hreq.hreq_code, "Response is not JSON encoded: %s",
                            content_type ? content_type : "(n/a)");
                rd_http_req_destroy(&hreq);
                return herr;
        }

        herr = rd_http_parse_json(&hreq, jsonp);

        rd_http_req_destroy(&hreq);

        return herr;
}


/**
 * @brief Same as rd_http_get() but requires a JSON response.
 *        The response is parsed and a JSON object is returned in \p *jsonp.
 *
 * Same error semantics as rd_http_get().
 */
rd_http_error_t *
rd_http_get_json(rd_kafka_t *rk, const char *url, cJSON **jsonp) {
        rd_http_req_t hreq;
        rd_http_error_t *herr;
        rd_slice_t slice;
        size_t len;
        const char *content_type;
        char *raw_json;
        const char *end;

        *jsonp = NULL;

        herr = rd_http_req_init(rk, &hreq, url);
        if (unlikely(herr != NULL))
                return herr;

        // FIXME: send Accept: json.. header?

        herr = rd_http_req_perform_sync(&hreq);
        len  = rd_buf_len(hreq.hreq_buf);
        if (herr && len == 0) {
                rd_http_req_destroy(&hreq);
                return herr;
        }

        if (len == 0) {
                /* Empty response: create empty JSON object */
                *jsonp = cJSON_CreateObject();
                rd_http_req_destroy(&hreq);
                return NULL;
        }

        content_type = rd_http_req_get_content_type(&hreq);

        if (!content_type || rd_strncasecmp(content_type, "application/json",
                                            strlen("application/json"))) {
                if (!herr)
                        herr = rd_http_error_new(
                            hreq.hreq_code, "Response is not JSON encoded: %s",
                            content_type ? content_type : "(n/a)");
                rd_http_req_destroy(&hreq);
                return herr;
        }

        /* cJSON requires the entire input to parse in contiguous memory. */
        rd_slice_init_full(&slice, hreq.hreq_buf);
        raw_json = rd_malloc(len + 1);
        rd_slice_read(&slice, raw_json, len);
        raw_json[len] = '\0';

        /* Parse JSON */
        end    = NULL;
        *jsonp = cJSON_ParseWithOpts(raw_json, &end, 0);
        if (!*jsonp && !herr)
                herr = rd_http_error_new(hreq.hreq_code,
                                         "Failed to parse JSON response "
                                         "at %" PRIusz "/%" PRIusz,
                                         (size_t)(end - raw_json), len);

        rd_free(raw_json);
        rd_http_req_destroy(&hreq);

        return herr;
}


void rd_http_global_init(void) {
        curl_global_init(CURL_GLOBAL_DEFAULT);
}


/**
 * @brief Unittest. Requires a (local) webserver to be set with env var
 *        RD_UT_HTTP_URL=http://localhost:1234/some-path
 *
 * This server must return a JSON object or array containing at least one
 * object on the main URL with a 2xx response code,
 * and 4xx response on $RD_UT_HTTP_URL/error (with whatever type of body).
 */

int unittest_http(void) {
        const char *base_url = rd_getenv("RD_UT_HTTP_URL", NULL);
        char *error_url;
        size_t error_url_size;
        cJSON *json, *jval;
        rd_http_error_t *herr;
        rd_bool_t empty;
        rd_kafka_t *rk;

        if (!base_url || !*base_url)
                RD_UT_SKIP("RD_UT_HTTP_URL environment variable not set");

        RD_UT_BEGIN();

        rk             = rd_calloc(1, sizeof(*rk));
        error_url_size = strlen(base_url) + strlen("/error") + 1;
        error_url      = rd_alloca(error_url_size);
        rd_snprintf(error_url, error_url_size, "%s/error", base_url);

        /* Try the base url first, parse its JSON and extract a key-value. */
        json = NULL;
        herr = rd_http_get_json(rk, base_url, &json);
        RD_UT_ASSERT(!herr, "Expected get_json(%s) to succeed, got: %s",
                     base_url, herr->errstr);

        empty = rd_true;
        cJSON_ArrayForEach(jval, json) {
                empty = rd_false;
                break;
        }
        RD_UT_ASSERT(!empty, "Expected non-empty JSON response from %s",
                     base_url);
        RD_UT_SAY(
            "URL %s returned no error and a non-empty "
            "JSON object/array as expected",
            base_url);
        cJSON_Delete(json);


        /* Try the error URL, verify error code. */
        json = NULL;
        herr = rd_http_get_json(rk, error_url, &json);
        RD_UT_ASSERT(herr != NULL, "Expected get_json(%s) to fail", error_url);
        RD_UT_ASSERT(herr->code >= 400,
                     "Expected get_json(%s) error code >= "
                     "400, got %d",
                     error_url, herr->code);
        RD_UT_SAY(
            "Error URL %s returned code %d, errstr \"%s\" "
            "and %s JSON object as expected",
            error_url, herr->code, herr->errstr, json ? "a" : "no");
        /* Check if there's a JSON document returned */
        if (json)
                cJSON_Delete(json);
        rd_http_error_destroy(herr);
        rd_free(rk);

        RD_UT_PASS();
}
